package drds.plus.executor.transaction.cobar_style;

import drds.plus.datanode.api.Connection;
import drds.plus.datanode.api.DatasourceManager;
import drds.plus.executor.transaction.AbstractConnectionHolder;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;

import java.sql.SQLException;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

@Slf4j
public class CobarStyleConnectionHolder extends AbstractConnectionHolder {
    public Logger log() {
        return log;
    }

    private Map<String, Queue<Connection>> dataNodeIdToConnectionQueueMap = new ConcurrentHashMap<String, Queue<Connection>>();

    public CobarStyleConnectionHolder() {

    }

    public Connection getConnection(String dataNodeId, DatasourceManager datasourceManager) throws SQLException {
        Queue<Connection> connectionQueue = this.dataNodeIdToConnectionQueueMap.get(dataNodeId);

        if (connectionQueue == null) {
            connectionQueue = new LinkedBlockingQueue<Connection>();
            this.dataNodeIdToConnectionQueueMap.put(dataNodeId, connectionQueue);
        }

        if (!connectionQueue.isEmpty()) {
            return connectionQueue.poll();
        }

        Connection connection = datasourceManager.getConnection();
        this.connectionSet.add(connection);

        return connection;

    }

    public void tryClose(String dataNodeId, Connection connection) {
        Queue<Connection> connectionQueue = this.dataNodeIdToConnectionQueueMap.get(dataNodeId);

        if (connectionQueue == null) {
            throw new IllegalAccessError("impossible");
        }
        connectionQueue.offer(connection);

    }

    public void closeConnectionSet() {
        super.closeConnectionSet();
        this.dataNodeIdToConnectionQueueMap.clear();
    }

    public void cancel() {
        super.cancel();
        this.dataNodeIdToConnectionQueueMap.clear();
    }
}
